In [1]:
%run startup.py
In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.
This is basically multicast.
In [3]:
rst(O.publish)
def emit(obs):
log('.........EMITTING........')
sleep(0.1)
obs.on_next(rand())
obs.on_completed()
rst(title='Reminder: 2 subscribers on a cold stream:')
s = O.create(emit)
d = subs(s), subs(s.delay(100))
rst(title='Now 2 subscribers on a PUBLISHED (hot) stream', sleep=0.4)
sp = s.publish()
subs(sp, name='subs1')
subs(sp.delay(100), name='subs2')
log('now connect')
# this creates a 'single, intermediate subscription between stream and subs'
d = sp.connect()
# will only see the finish, since subscribed too late
d = subs(sp, name='subs3')
In [4]:
rst(O.publish_value)
def sideeffect(*x):
log('sideffect', x)
print('Everybody gets the initial value and the events, sideeffect only once per ev')
src = O.interval(500).take(20).do_action(sideeffect)
published = src.publish_value(42)
subs(published), subs(published.delay(100))
d = published.connect()
sleep(1.3)
log('disposing now')
d.dispose()
In [36]:
# not yet in RXPy
RxPY also has a multicast operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence.
Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.
Following the RXJS example at reactive.io docu:
In [34]:
rst(O.multicast)
# show actions on intermediate subject:
show = False
def emit(obs):
'instead of range we allow some logging:'
for i in (1, 2):
v = rand()
log('emitting', v)
obs.on_next(v)
log('complete')
obs.on_completed()
class MySubject:
def __init__(self):
self.rx_subj = Subject()
if show:
log('New Subject %s created' % self)
def __str__(self):
return str(hash(self))[-4:]
def __getattr__(self, a):
'called at any attr. access, logging it'
if not a.startswith('__') and show:
log('RX called', a, 'on MySub\n')
return getattr(self.rx_subj, a)
subject1 = MySubject()
subject2 = MySubject()
source = O.create(emit).multicast(subject2)
# a "subscription" *is* a disposable
# (the normal d we return all the time):
d, observer = subs(source, return_subscriber=True)
ds1 = subject1.subscribe(observer)
ds2 = subject2.subscribe(observer)
print ('we have now 3 subscriptions, only two will see values.')
print('start multicast stream (calling connect):')
connected = source.connect()
d.dispose()
In [58]:
rst(O.let)
# show actions on intermediate subject:
show = True
def emit(obs):
'instead of range we allow some logging:'
v = rand()
log('emitting', v)
obs.on_next(v)
log('complete')
obs.on_completed()
source = O.create(emit)
# following the RXJS example:
header("without let")
d = subs(source.concat(source))
d = subs(source.concat(source))
header("now with let")
d = subs(source.let(lambda o: o.concat(o)))
d = subs(source.let(lambda o: o.concat(o)))
# TODO: Not understood:
# "This operator allows for a fluent style of writing queries that use the same sequence multiple times."
# ... I can't verify this, the source sequence is not duplicated but called every time like a cold obs.
A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
In [39]:
rst(O.replay)
def emit(obs):
'continuous emission'
for i in range(0, 5):
v = 'nr %s, value %s' % (i, rand())
log('emitting', v, '\n')
obs.on_next(v)
sleep(0.2)
def sideeffect(*v):
log("sync sideeffect (0.2s)", v, '\n')
sleep(0.2)
log("end sideeffect", v, '\n')
def modified_stream(o):
log('modified_stream (take 2)')
return o.map(lambda x: 'MODIFIED FOR REPLAY: %s' % x).take(2)
header("playing and replaying...")
subject = Subject()
cold = O.create(emit).take(3).do_action(sideeffect)
assert not getattr(cold, 'connect', None)
hot = cold.multicast(subject)
connect = hot.connect # present now.
#d, observer = subs(hot, return_subscriber=True, name='normal subscriber\n')
#d1 = subject.subscribe(observer)
published = hot.replay(modified_stream, 1000, 50000)
d2 = subs(published, name='Replay Subs 1\n')
#header("replaying again")
#d = subs(published, name='Replay Subs 2\n')
log('calling connect now...')
d3 = hot.connect()
If you apply the Replay operator to an Observable
In [23]:
def mark(x):
return 'marked %x' % x
def side_effect(x):
log('sideeffect %s\n' % x)
for i in 1, 2:
s = O.interval(100).take(3).do_action(side_effect)
if i == 2:
sleep(1)
header("now with publish - no more sideeffects in the replays")
s = s.publish()
reset_start_time()
published = s.replay(lambda o: o.map(mark).take(3).repeat(2), 3)
d = subs(s, name='Normal\n')
d = subs(published, name='Replayer A\n')
d = subs(published, name='Replayer B\n')
if i == 2:
d = s.connect()
In [ ]:
A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.
The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so.
In [33]:
rst(O.interval(1).publish)
publ = O.interval(1000).take(2).publish().ref_count()
# be aware about potential race conditions here
subs(publ)
subs(publ)
Out[33]:
In [41]:
rst(O.interval(1).share)
def sideffect(v):
log('sideeffect %s\n' % v)
publ = O.interval(200).take(2).do_action(sideeffect).share()
'''
When the number of observers subscribed to published observable goes from
0 to 1, we connect to the underlying observable sequence.
published.subscribe(createObserver('SourceA'));
When the second subscriber is added, no additional subscriptions are added to the
underlying observable sequence. As a result the operations that result in side
effects are not repeated per subscriber.
'''
subs(publ, name='SourceA')
subs(publ, name='SourceB')
Out[41]:
You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.
Call a ConnectableObservable’s connect method to instruct it to begin emitting the items from its underlying Observable to its Subscribers.
The connect method returns a Disposable. You can call that Disposable object’s dispose method to instruct the Observable to stop emitting items to its Subscribers.
You can also use the connect method to instruct an Observable to begin emitting items (or, to begin generating items that would be emitted) even before any Subscriber has subscribed to it.
In this way you can turn a cold Observable into a hot one.
In [62]:
rst(O.interval(1).publish().connect)
published = O.create(emit).publish()
def emit(obs):
for i in range(0, 10):
log('emitting', i, obs.__class__.__name__, hash(obs))
# going nowhere
obs.on_next(i)
sleep(0.1)
import thread
thread.start_new_thread(published.connect, ())
sleep(0.5)
d = subs(published, scheduler=new_thread_scheduler)
In [ ]: